home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.4) import unittest import pickle import cPickle import pickletools import copy_reg from test.test_support import TestFailed, have_unicode, TESTFN if cPickle.HIGHEST_PROTOCOL == cPickle.HIGHEST_PROTOCOL: pass elif not cPickle.HIGHEST_PROTOCOL == 2: raise AssertionError protocols = range(pickle.HIGHEST_PROTOCOL + 1) def opcode_in_pickle(code, pickle): for op, dummy, dummy in pickletools.genops(pickle): if op.code == code: return True continue return False def count_opcode(code, pickle): n = 0 for op, dummy, dummy in pickletools.genops(pickle): if op.code == code: n += 1 continue return n class ExtensionSaver: def __init__(self, code): self.code = code if code in copy_reg._inverted_registry: self.pair = copy_reg._inverted_registry[code] copy_reg.remove_extension(self.pair[0], self.pair[1], code) else: self.pair = None def restore(self): code = self.code curpair = copy_reg._inverted_registry.get(code) if curpair is not None: copy_reg.remove_extension(curpair[0], curpair[1], code) pair = self.pair if pair is not None: copy_reg.add_extension(pair[0], pair[1], code) class C: def __cmp__(self, other): return cmp(self.__dict__, other.__dict__) import __main__ __main__.C = C C.__module__ = '__main__' class myint(int): def __init__(self, x): self.str = str(x) class initarg(C): def __init__(self, a, b): self.a = a self.b = b def __getinitargs__(self): return (self.a, self.b) class metaclass(type): pass class use_metaclass(object): __metaclass__ = metaclass DATA0 = '(lp1\nI0\naL1L\naF2\nac__builtin__\ncomplex\np2\n' + '(F3\nF0\ntRp3\naI1\naI-1\naI255\naI-255\naI-256\naI65535\naI-65535\naI-65536\naI2147483647\naI-2147483647\naI-2147483648\na' + "(S'abc'\np4\ng4\n" + '(i__main__\nC\np5\n' + "(dp6\nS'foo'\np7\nI1\nsS'bar'\np8\nI2\nsbg5\ntp9\nag9\naI5\na.\n" DATA0_DIS = " 0: ( MARK\n 1: l LIST (MARK at 0)\n 2: p PUT 1\n 5: I INT 0\n 8: a APPEND\n 9: L LONG 1L\n 13: a APPEND\n 14: F FLOAT 2.0\n 17: a APPEND\n 18: c GLOBAL '__builtin__ complex'\n 39: p PUT 2\n 42: ( MARK\n 43: F FLOAT 3.0\n 46: F FLOAT 0.0\n 49: t TUPLE (MARK at 42)\n 50: R REDUCE\n 51: p PUT 3\n 54: a APPEND\n 55: I INT 1\n 58: a APPEND\n 59: I INT -1\n 63: a APPEND\n 64: I INT 255\n 69: a APPEND\n 70: I INT -255\n 76: a APPEND\n 77: I INT -256\n 83: a APPEND\n 84: I INT 65535\n 91: a APPEND\n 92: I INT -65535\n 100: a APPEND\n 101: I INT -65536\n 109: a APPEND\n 110: I INT 2147483647\n 122: a APPEND\n 123: I INT -2147483647\n 136: a APPEND\n 137: I INT -2147483648\n 150: a APPEND\n 151: ( MARK\n 152: S STRING 'abc'\n 159: p PUT 4\n 162: g GET 4\n 165: ( MARK\n 166: i INST '__main__ C' (MARK at 165)\n 178: p PUT 5\n 181: ( MARK\n 182: d DICT (MARK at 181)\n 183: p PUT 6\n 186: S STRING 'foo'\n 193: p PUT 7\n 196: I INT 1\n 199: s SETITEM\n 200: S STRING 'bar'\n 207: p PUT 8\n 210: I INT 2\n 213: s SETITEM\n 214: b BUILD\n 215: g GET 5\n 218: t TUPLE (MARK at 151)\n 219: p PUT 9\n 222: a APPEND\n 223: g GET 9\n 226: a APPEND\n 227: I INT 5\n 230: a APPEND\n 231: . STOP\nhighest protocol among opcodes = 0\n" DATA1 = ']q\x01(K\x00L1L\nG@\x00\x00\x00\x00\x00\x00\x00c__builtin__\ncomplex\nq\x02(G@\x08\x00\x00\x00\x00\x00\x00G\x00\x00\x00\x00\x00\x00\x00\x00tRq\x03K\x01J\xff\xff\xff\xffK\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xffJ\x01\x00\xff\xffJ\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00\x00\x80J\x00\x00\x00\x80(U\x03abcq\x04h\x04(c__main__\nC\nq\x05oq\x06}q\x07(U\x03fooq\x08K\x01U\x03barq\tK\x02ubh\x06tq\nh\nK\x05e.' DATA1_DIS = " 0: ] EMPTY_LIST\n 1: q BINPUT 1\n 3: ( MARK\n 4: K BININT1 0\n 6: L LONG 1L\n 10: G BINFLOAT 2.0\n 19: c GLOBAL '__builtin__ complex'\n 40: q BINPUT 2\n 42: ( MARK\n 43: G BINFLOAT 3.0\n 52: G BINFLOAT 0.0\n 61: t TUPLE (MARK at 42)\n 62: R REDUCE\n 63: q BINPUT 3\n 65: K BININT1 1\n 67: J BININT -1\n 72: K BININT1 255\n 74: J BININT -255\n 79: J BININT -256\n 84: M BININT2 65535\n 87: J BININT -65535\n 92: J BININT -65536\n 97: J BININT 2147483647\n 102: J BININT -2147483647\n 107: J BININT -2147483648\n 112: ( MARK\n 113: U SHORT_BINSTRING 'abc'\n 118: q BINPUT 4\n 120: h BINGET 4\n 122: ( MARK\n 123: c GLOBAL '__main__ C'\n 135: q BINPUT 5\n 137: o OBJ (MARK at 122)\n 138: q BINPUT 6\n 140: } EMPTY_DICT\n 141: q BINPUT 7\n 143: ( MARK\n 144: U SHORT_BINSTRING 'foo'\n 149: q BINPUT 8\n 151: K BININT1 1\n 153: U SHORT_BINSTRING 'bar'\n 158: q BINPUT 9\n 160: K BININT1 2\n 162: u SETITEMS (MARK at 143)\n 163: b BUILD\n 164: h BINGET 6\n 166: t TUPLE (MARK at 112)\n 167: q BINPUT 10\n 169: h BINGET 10\n 171: K BININT1 5\n 173: e APPENDS (MARK at 3)\n 174: . STOP\nhighest protocol among opcodes = 1\n" DATA2 = '\x80\x02]q\x01(K\x00\x8a\x01\x01G@\x00\x00\x00\x00\x00\x00\x00c__builtin__\ncomplex\nq\x02G@\x08\x00\x00\x00\x00\x00\x00G\x00\x00\x00\x00\x00\x00\x00\x00\x86Rq\x03K\x01J\xff\xff\xff\xffK\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xffJ\x01\x00\xff\xffJ\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00\x00\x80J\x00\x00\x00\x80(U\x03abcq\x04h\x04(c__main__\nC\nq\x05oq\x06}q\x07(U\x03fooq\x08K\x01U\x03barq\tK\x02ubh\x06tq\nh\nK\x05e.' DATA2_DIS = " 0: \x80 PROTO 2\n 2: ] EMPTY_LIST\n 3: q BINPUT 1\n 5: ( MARK\n 6: K BININT1 0\n 8: \x8a LONG1 1L\n 11: G BINFLOAT 2.0\n 20: c GLOBAL '__builtin__ complex'\n 41: q BINPUT 2\n 43: G BINFLOAT 3.0\n 52: G BINFLOAT 0.0\n 61: \x86 TUPLE2\n 62: R REDUCE\n 63: q BINPUT 3\n 65: K BININT1 1\n 67: J BININT -1\n 72: K BININT1 255\n 74: J BININT -255\n 79: J BININT -256\n 84: M BININT2 65535\n 87: J BININT -65535\n 92: J BININT -65536\n 97: J BININT 2147483647\n 102: J BININT -2147483647\n 107: J BININT -2147483648\n 112: ( MARK\n 113: U SHORT_BINSTRING 'abc'\n 118: q BINPUT 4\n 120: h BINGET 4\n 122: ( MARK\n 123: c GLOBAL '__main__ C'\n 135: q BINPUT 5\n 137: o OBJ (MARK at 122)\n 138: q BINPUT 6\n 140: } EMPTY_DICT\n 141: q BINPUT 7\n 143: ( MARK\n 144: U SHORT_BINSTRING 'foo'\n 149: q BINPUT 8\n 151: K BININT1 1\n 153: U SHORT_BINSTRING 'bar'\n 158: q BINPUT 9\n 160: K BININT1 2\n 162: u SETITEMS (MARK at 143)\n 163: b BUILD\n 164: h BINGET 6\n 166: t TUPLE (MARK at 112)\n 167: q BINPUT 10\n 169: h BINGET 10\n 171: K BININT1 5\n 173: e APPENDS (MARK at 5)\n 174: . STOP\nhighest protocol among opcodes = 2\n" def create_data(): c = C() c.foo = 1 c.bar = 2 x = [ 0, 0x1L, 2.0, 3.0 + (0.0+0.0j)] uint1max = 255 uint2max = 65535 int4max = 2147483647 x.extend([ 1, -1, uint1max, -uint1max, -uint1max - 1, uint2max, -uint2max, -uint2max - 1, int4max, -int4max, -int4max - 1]) y = ('abc', 'abc', c, c) x.append(y) x.append(y) x.append(5) return x class AbstractPickleTests(unittest.TestCase): _testdata = create_data() def setUp(self): pass def test_misc(self): for proto in protocols: x = myint(4) s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) x = (1, ()) s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) x = initarg(1, x) s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) def test_roundtrip_equality(self): expected = self._testdata for proto in protocols: s = self.dumps(expected, proto) got = self.loads(s) self.assertEqual(expected, got) def test_load_from_canned_string(self): expected = self._testdata for canned in (DATA0, DATA1, DATA2): got = self.loads(canned) self.assertEqual(expected, got) def dont_test_disassembly(self): StringIO = StringIO import cStringIO dis = dis import pickletools for proto, expected in ((0, DATA0_DIS), (1, DATA1_DIS)): s = self.dumps(self._testdata, proto) filelike = StringIO() dis(s, out = filelike) got = filelike.getvalue() self.assertEqual(expected, got) def test_recursive_list(self): l = [] l.append(l) for proto in protocols: s = self.dumps(l, proto) x = self.loads(s) self.assertEqual(len(x), 1) self.assert_(x is x[0]) def test_recursive_dict(self): d = { } d[1] = d for proto in protocols: s = self.dumps(d, proto) x = self.loads(s) self.assertEqual(x.keys(), [ 1]) self.assert_(x[1] is x) def test_recursive_inst(self): i = C() i.attr = i for proto in protocols: s = self.dumps(i, 2) x = self.loads(s) self.assertEqual(dir(x), dir(i)) self.assert_(x.attr is x) def test_recursive_multi(self): l = [] d = { 1: l } i = C() i.attr = d l.append(i) for proto in protocols: s = self.dumps(l, proto) x = self.loads(s) self.assertEqual(len(x), 1) self.assertEqual(dir(x[0]), dir(i)) self.assertEqual(x[0].attr.keys(), [ 1]) self.assert_(x[0].attr[1] is x) def test_garyp(self): self.assertRaises(self.error, self.loads, 'garyp') def test_insecure_strings(self): insecure = [ 'abc', '2 + 2', "'abc", '\'abc"', "'abc' ?", "'\\'"] for s in insecure: buf = 'S' + s + '\np0\n.' self.assertRaises(ValueError, self.loads, buf) if have_unicode: def test_unicode(self): endcases = [ unicode(''), unicode('<\\u>'), unicode('<\\\\u1234>'), unicode('<\n>'), unicode('<\\>')] for proto in protocols: for u in endcases: p = self.dumps(u, proto) u2 = self.loads(p) self.assertEqual(u2, u) def test_ints(self): import sys for proto in protocols: n = sys.maxint while n: for expected in (-n, n): s = self.dumps(expected, proto) n2 = self.loads(s) self.assertEqual(expected, n2) n = n >> 1 def test_maxint64(self): maxint64 = (0x1L << 63) - 1 data = 'I' + str(maxint64) + '\n.' got = self.loads(data) self.assertEqual(got, maxint64) data = 'I' + str(maxint64) + 'JUNK\n.' self.assertRaises(ValueError, self.loads, data) def test_long(self): for proto in protocols: for nbits in (1, 8, 8 * 254, 8 * 255, 8 * 256, 8 * 257): nbase = 0x1L << nbits for npos in (nbase - 1, nbase, nbase + 1): for n in (npos, -npos): pickle = self.dumps(n, proto) got = self.loads(pickle) self.assertEqual(n, got) nbase = long('deadbeeffeedface', 16) nbase += nbase << 1000000 for n in (nbase, -nbase): p = self.dumps(n, 2) got = self.loads(p) self.assertEqual(n, got) def test_reduce(self): pass def test_getinitargs(self): pass def test_metaclass(self): a = use_metaclass() for proto in protocols: s = self.dumps(a, proto) b = self.loads(s) self.assertEqual(a.__class__, b.__class__) def test_structseq(self): import time import os t = time.localtime() for proto in protocols: s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) if hasattr(os, 'stat'): t = os.stat(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) if hasattr(os, 'statvfs'): t = os.statvfs(os.curdir) s = self.dumps(t, proto) u = self.loads(s) self.assertEqual(t, u) continue def test_proto(self): build_none = pickle.NONE + pickle.STOP for proto in protocols: expected = build_none if proto >= 2: expected = pickle.PROTO + chr(proto) + expected p = self.dumps(None, proto) self.assertEqual(p, expected) oob = protocols[-1] + 1 badpickle = pickle.PROTO + chr(oob) + build_none try: self.loads(badpickle) except ValueError: detail = None self.failUnless(str(detail).startswith('unsupported pickle protocol')) self.fail('expected bad protocol number to raise ValueError') def test_long1(self): x = 0x27E41B32C1EEF784CCA555E8L for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) self.assertEqual(opcode_in_pickle(pickle.LONG1, s), proto >= 2) def test_long4(self): x = 0x27E41B32C1EEF784CCA555E8L << 256 * 8 for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) self.assertEqual(opcode_in_pickle(pickle.LONG4, s), proto >= 2) def test_short_tuples(self): expected_opcode = { (0, 0): pickle.TUPLE, (0, 1): pickle.TUPLE, (0, 2): pickle.TUPLE, (0, 3): pickle.TUPLE, (0, 4): pickle.TUPLE, (1, 0): pickle.EMPTY_TUPLE, (1, 1): pickle.TUPLE, (1, 2): pickle.TUPLE, (1, 3): pickle.TUPLE, (1, 4): pickle.TUPLE, (2, 0): pickle.EMPTY_TUPLE, (2, 1): pickle.TUPLE1, (2, 2): pickle.TUPLE2, (2, 3): pickle.TUPLE3, (2, 4): pickle.TUPLE } a = () b = (1,) c = (1, 2) d = (1, 2, 3) e = (1, 2, 3, 4) for proto in protocols: for x in (a, b, c, d, e): s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y, (proto, x, s, y)) expected = expected_opcode[(proto, len(x))] self.assertEqual(opcode_in_pickle(expected, s), True) def test_singletons(self): expected_opcode = { (0, None): pickle.NONE, (1, None): pickle.NONE, (2, None): pickle.NONE, (0, True): pickle.INT, (1, True): pickle.INT, (2, True): pickle.NEWTRUE, (0, False): pickle.INT, (1, False): pickle.INT, (2, False): pickle.NEWFALSE } for proto in protocols: for x in (None, False, True): s = self.dumps(x, proto) y = self.loads(s) self.assert_(x is y, (proto, x, s, y)) expected = expected_opcode[(proto, x)] self.assertEqual(opcode_in_pickle(expected, s), True) def test_newobj_tuple(self): x = MyTuple([ 1, 2, 3]) x.foo = 42 x.bar = 'hello' for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(tuple(x), tuple(y)) self.assertEqual(x.__dict__, y.__dict__) def test_newobj_list(self): x = MyList([ 1, 2, 3]) x.foo = 42 x.bar = 'hello' for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(list(x), list(y)) self.assertEqual(x.__dict__, y.__dict__) def test_newobj_generic(self): for proto in protocols: for C in myclasses: B = C.__base__ x = C(C.sample) x.foo = 42 s = self.dumps(x, proto) y = self.loads(s) detail = (proto, C, B, x, y, type(y)) self.assertEqual(B(x), B(y), detail) self.assertEqual(x.__dict__, y.__dict__, detail) def produce_global_ext(self, extcode, opcode): e = ExtensionSaver(extcode) try: copy_reg.add_extension(__name__, 'MyList', extcode) x = MyList([ 1, 2, 3]) x.foo = 42 x.bar = 'hello' s1 = self.dumps(x, 1) self.assert_(__name__ in s1) self.assert_('MyList' in s1) self.assertEqual(opcode_in_pickle(opcode, s1), False) y = self.loads(s1) self.assertEqual(list(x), list(y)) self.assertEqual(x.__dict__, y.__dict__) s2 = self.dumps(x, 2) self.assert_(__name__ not in s2) self.assert_('MyList' not in s2) self.assertEqual(opcode_in_pickle(opcode, s2), True) y = self.loads(s2) self.assertEqual(list(x), list(y)) self.assertEqual(x.__dict__, y.__dict__) finally: e.restore() def test_global_ext1(self): self.produce_global_ext(1, pickle.EXT1) self.produce_global_ext(255, pickle.EXT1) def test_global_ext2(self): self.produce_global_ext(256, pickle.EXT2) self.produce_global_ext(65535, pickle.EXT2) self.produce_global_ext(43981, pickle.EXT2) def test_global_ext4(self): self.produce_global_ext(65536, pickle.EXT4) self.produce_global_ext(2147483647, pickle.EXT4) self.produce_global_ext(313249263, pickle.EXT4) def test_list_chunking(self): n = 10 x = range(n) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_appends = count_opcode(pickle.APPENDS, s) self.assertEqual(num_appends, proto > 0) n = 2500 x = range(n) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_appends = count_opcode(pickle.APPENDS, s) if proto == 0: self.assertEqual(num_appends, 0) continue self.failUnless(num_appends >= 2) def test_dict_chunking(self): n = 10 x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) self.assertEqual(num_setitems, proto > 0) n = 2500 x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) if proto == 0: self.assertEqual(num_setitems, 0) continue self.failUnless(num_setitems >= 2) def test_simple_newobj(self): x = object.__new__(SimpleNewObj) x.abc = 666 for proto in protocols: s = self.dumps(x, proto) self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2) y = self.loads(s) self.assertEqual(y.abc, 666) self.assertEqual(x.__dict__, y.__dict__) def test_newobj_list_slots(self): x = SlotList([ 1, 2, 3]) x.foo = 42 x.bar = 'hello' s = self.dumps(x, 2) y = self.loads(s) self.assertEqual(list(x), list(y)) self.assertEqual(x.__dict__, y.__dict__) self.assertEqual(x.foo, y.foo) self.assertEqual(x.bar, y.bar) def test_reduce_overrides_default_reduce_ex(self): for proto in (0, 1, 2): x = REX_one() self.assertEqual(x._reduce_called, 0) s = self.dumps(x, proto) self.assertEqual(x._reduce_called, 1) y = self.loads(s) self.assertEqual(y._reduce_called, 0) def test_reduce_ex_called(self): for proto in (0, 1, 2): x = REX_two() self.assertEqual(x._proto, None) s = self.dumps(x, proto) self.assertEqual(x._proto, proto) y = self.loads(s) self.assertEqual(y._proto, None) def test_reduce_ex_overrides_reduce(self): for proto in (0, 1, 2): x = REX_three() self.assertEqual(x._proto, None) s = self.dumps(x, proto) self.assertEqual(x._proto, proto) y = self.loads(s) self.assertEqual(y._proto, None) class REX_one(object): _reduce_called = 0 def __reduce__(self): self._reduce_called = 1 return (REX_one, ()) class REX_two(object): _proto = None def __reduce_ex__(self, proto): self._proto = proto return (REX_two, ()) class REX_three(object): _proto = None def __reduce_ex__(self, proto): self._proto = proto return (REX_two, ()) def __reduce__(self): raise TestFailed, "This __reduce__ shouldn't be called" class MyInt(int): sample = 1 class MyLong(long): sample = 0x1L class MyFloat(float): sample = 1.0 class MyComplex(complex): sample = 1.0 + (0.0+0.0j) class MyStr(str): sample = 'hello' class MyUnicode(unicode): sample = u'hello ሴ' class MyTuple(tuple): sample = (1, 2, 3) class MyList(list): sample = [ 1, 2, 3] class MyDict(dict): sample = { 'a': 1, 'b': 2 } myclasses = [ MyInt, MyLong, MyFloat, MyComplex, MyStr, MyUnicode, MyTuple, MyList, MyDict] class SlotList(MyList): __slots__ = [ 'foo'] class SimpleNewObj(object): def __init__(self, a, b, c): raise TypeError("SimpleNewObj.__init__() didn't expect to get called") class AbstractPickleModuleTests(unittest.TestCase): def test_dump_closed_file(self): import os f = open(TESTFN, 'w') try: f.close() self.assertRaises(ValueError, self.module.dump, 123, f) finally: os.remove(TESTFN) def test_load_closed_file(self): import os f = open(TESTFN, 'w') try: f.close() self.assertRaises(ValueError, self.module.dump, 123, f) finally: os.remove(TESTFN) def test_highest_protocol(self): self.assertEqual(self.module.HIGHEST_PROTOCOL, 2) def test_callapi(self): StringIO = StringIO import cStringIO f = StringIO() self.module.dump(123, f, -1) self.module.dump(123, file = f, protocol = -1) self.module.dumps(123, -1) self.module.dumps(123, protocol = -1) self.module.Pickler(f, -1) self.module.Pickler(f, protocol = -1) class AbstractPersistentPicklerTests(unittest.TestCase): def persistent_id(self, object): if isinstance(object, int) and object % 2 == 0: self.id_count += 1 return str(object) else: return None def persistent_load(self, oid): self.load_count += 1 object = int(oid) if not object % 2 == 0: raise AssertionError self return object def test_persistence(self): self.id_count = 0 self.load_count = 0 L = range(10) self.assertEqual(self.loads(self.dumps(L)), L) self.assertEqual(self.id_count, 5) self.assertEqual(self.load_count, 5) def test_bin_persistence(self): self.id_count = 0 self.load_count = 0 L = range(10) self.assertEqual(self.loads(self.dumps(L, 1)), L) self.assertEqual(self.id_count, 5) self.assertEqual(self.load_count, 5)